package com.icesoft.core.common.helper.queue;

import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.task.TaskExecutor;

public class AsyncTaskQueueServer<T> {
	protected final Logger log = LoggerFactory.getLogger(this.getClass());
	private TaskExecutor taskExecutor;
	private ITaskDataService<T> taskDataService;
	private Thread thread;
	private volatile boolean start = false;
	private Semaphore semaphore;
	private ITaskDataPersistenceService<T> taskDataPersistenceService;
	private String threadName;
	private int taskSize;

	public AsyncTaskQueueServer(ITaskDataService<T> taskDataService, TaskExecutor taskExecutor) {
		this.taskDataService = taskDataService;
		this.taskExecutor = taskExecutor;
		this.taskDataPersistenceService = taskDataService.persistenceService();
		threadName = "QueueServer_" + taskDataService.taskKey();
	}

	public void start() {
		start(true);
	}

	public void setThreadName(String threadName) {
		this.threadName = threadName;
	}

	public synchronized void start(boolean deamon) {
		if (thread != null) {
			throw new RuntimeException("服务已启动了");
		}
		taskSize = taskDataService.limitThreadSize();
		semaphore = new Semaphore(taskSize);
		thread = new QueueThread();
		thread.setName(threadName);
		thread.setDaemon(deamon);
		thread.start();
	}

	public synchronized void stop() {
		start = false;
		log.info("停止{}处理服务", taskDataService.taskKey());
		if (thread == null) {
			throw new RuntimeException("服务未启动");
		}
		thread.interrupt();
		thread = null;
	}

	private class QueueThread extends Thread {

		private Queue<T> listLastRunning() {
			Queue<T> queue = null;
			if (taskDataPersistenceService != null) {
				List<T> list = taskDataPersistenceService.listLastRunning();
				if (list != null && !list.isEmpty()) {
					queue = new ConcurrentLinkedQueue<>(list);
				}
			}
			return queue;
		}

		@Override
		public void run() {
			start = true;
			log.info("启动{}处理服务", taskDataService.taskKey());
			Queue<T> queue = listLastRunning();
			while (start && !Thread.currentThread().isInterrupted()) {
				try {
					semaphore.acquire();
				} catch (InterruptedException e1) {
					log.error("{}队列任务中断异常");
					break;
				}
				T data = null;

				boolean isLastRunning = false;
				while (data == null) {
					try {
						if (queue != null && !queue.isEmpty()) {
							data = queue.poll();
						}
						if (data == null) {
							data = taskDataService.blockPop();
							isLastRunning = false;
							queue = null;
						} else {
							isLastRunning = true;
						}
					} catch (Exception e) {
						log.error("taskDataService blockPop 异常", e);
						return;
					}
					if (data == null) {
						try {
							Thread.sleep(100);
						} catch (InterruptedException e) {
							return;
						}
					}
				}
				if (taskDataPersistenceService != null && !isLastRunning) {
					taskDataPersistenceService.saveRunningData(data);
				}

				log.debug("提交任务数据：{}", data);
				taskExecutor.execute(new Task(data));
				if (log.isTraceEnabled()) {
					log.trace(taskDataService.taskKey() + "当前线程数：" + (taskSize - semaphore.availablePermits()));
				}
			}
		}

	}

	private class Task implements Runnable {
		private T data;

		Task(T data) {
			this.data = data;
		}

		@Override
		public void run() {
			try {
				taskDataService.handle(data);
				if (taskDataPersistenceService != null) {
					taskDataPersistenceService.deleteRunningData(data);
				}
			} catch (Exception e) {
				log.error("处理异常", e);
				try {
					taskDataService.exceptionHandle(data, e);
				} catch (Exception e2) {
					log.warn("处理失败：{}", data);
					log.error("exceptionHandle执行异常", e2);
				}
			} finally {
				semaphore.release();
			}
		}

	}

}
